View Javadoc

1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.junit.runner.Description;
23  import org.junit.runners.model.RunnerScheduler;
24  
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Set;
28  import java.util.concurrent.CopyOnWriteArraySet;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.ThreadPoolExecutor;
32  
33  /**
34   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
35   * a master scheduler can shutdown slaves.
36   * <p/>
37   * The scheduler objects should be first created (and wired) and set in runners
38   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
39   * <p/>
40   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
41   *
42   * @author Tibor Digana (tibor17)
43   * @since 2.16
44   */
45  public class Scheduler
46      implements RunnerScheduler
47  {
48      private final Balancer balancer;
49  
50      private final SchedulingStrategy strategy;
51  
52      private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
53  
54      private final Description description;
55  
56      private volatile boolean shutdown = false;
57  
58      private volatile boolean started = false;
59  
60      private volatile Controller masterController;
61  
62      /**
63       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
64       * <p/>
65       * You can use it with one infinite thread pool shared in strategies across all
66       * suites, class runners, etc.
67       */
68      public Scheduler( Description description, SchedulingStrategy strategy )
69      {
70          this( description, strategy, -1 );
71      }
72  
73      /**
74       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
75       * <p/>
76       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
77       * {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy)}
78       * or {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
79       *
80       * @param description description of current runner
81       * @param strategy    scheduling strategy with a shared thread pool
82       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
83       * @throws NullPointerException if null <tt>strategy</tt>
84       */
85      public Scheduler( Description description, SchedulingStrategy strategy, int concurrency )
86      {
87          this( description, strategy, BalancerFactory.createBalancer( concurrency ) );
88      }
89  
90      /**
91       * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
92       * against other groups of schedulers. The schedulers share one pool.
93       * <p/>
94       * Unlike in {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)} which was limiting
95       * the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em> <tt>balancer</tt>
96       * is limiting the concurrency of all children in runners having schedulers created by this constructor.
97       *
98       * @param description description of current runner
99       * @param strategy    scheduling strategy which may share threads with other strategy
100      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
101      * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
102      */
103     public Scheduler( Description description, SchedulingStrategy strategy, Balancer balancer )
104     {
105         strategy.setDefaultShutdownHandler( newShutdownHandler() );
106         this.description = description;
107         this.strategy = strategy;
108         this.balancer = balancer;
109         masterController = null;
110     }
111 
112     /**
113      * Can be used by e.g. a runner having parallel classes in use case with parallel
114      * suites, classes and methods sharing the same thread pool.
115      *
116      * @param description     description of current runner
117      * @param masterScheduler scheduler sharing own threads with this slave
118      * @param strategy        scheduling strategy for this scheduler
119      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
120      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
121      */
122     public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy,
123                       Balancer balancer )
124     {
125         this( description, strategy, balancer );
126         strategy.setDefaultShutdownHandler( newShutdownHandler() );
127         masterScheduler.register( this );
128     }
129 
130     /**
131      * @param masterScheduler a reference to {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)}
132      *                        or {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy)}
133      * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy)
134      * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)
135      */
136     public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy, int concurrency )
137     {
138         this( description, strategy, concurrency );
139         strategy.setDefaultShutdownHandler( newShutdownHandler() );
140         masterScheduler.register( this );
141     }
142 
143     /**
144      * Should be used with individual pools on suites, classes and methods, see
145      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
146      * <p/>
147      * Cached thread pool is infinite and can be always shared.
148      */
149     public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy )
150     {
151         this( description, masterScheduler, strategy, 0 );
152     }
153 
154     private void setController( Controller masterController )
155     {
156         if ( masterController == null )
157         {
158             throw new NullPointerException( "null ExecutionController" );
159         }
160         this.masterController = masterController;
161     }
162 
163     /**
164      * @param slave a slave scheduler to register
165      * @return <tt>true</tt> if successfully registered the <tt>slave</tt>.
166      */
167     private boolean register( Scheduler slave )
168     {
169         boolean canRegister = slave != null && slave != this;
170         if ( canRegister )
171         {
172             Controller controller = new Controller( slave );
173             canRegister = !slaves.contains( controller );
174             if ( canRegister )
175             {
176                 slaves.add( controller );
177                 slave.setController( controller );
178             }
179         }
180         return canRegister;
181     }
182 
183     /**
184      * @return <tt>true</tt> if new tasks can be scheduled.
185      */
186     private boolean canSchedule()
187     {
188         return !shutdown && ( masterController == null || masterController.canSchedule() );
189     }
190 
191     protected void logQuietly( Throwable t )
192     {
193         t.printStackTrace( System.err );
194     }
195 
196     protected void logQuietly( String msg )
197     {
198         System.err.println( msg );
199     }
200 
201     /**
202      * Attempts to stop all actively executing tasks and immediately returns a collection
203      * of descriptions of those tasks which have started prior to this call.
204      * <p/>
205      * This scheduler and other registered schedulers will shutdown, see {@link #register(Scheduler)}.
206      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
207      *
208      * @param shutdownNow if <tt>true</tt> interrupts waiting methods
209      * @return collection of descriptions started before shutting down
210      */
211     public Collection<Description> shutdown( boolean shutdownNow )
212     {
213         shutdown = true;
214         ArrayList<Description> activeChildren = new ArrayList<Description>();
215 
216         if ( started && description != null )
217         {
218             activeChildren.add( description );
219         }
220 
221         for ( Controller slave : slaves )
222         {
223             try
224             {
225                 activeChildren.addAll( slave.shutdown( shutdownNow ) );
226             }
227             catch ( Throwable t )
228             {
229                 logQuietly( t );
230             }
231         }
232 
233         try
234         {
235             balancer.releaseAllPermits();
236         }
237         finally
238         {
239             if ( shutdownNow )
240             {
241                 strategy.stopNow();
242             }
243             else
244             {
245                 strategy.stop();
246             }
247         }
248 
249         return activeChildren;
250     }
251 
252     protected void beforeExecute()
253     {
254     }
255 
256     protected void afterExecute()
257     {
258     }
259 
260     public void schedule( Runnable childStatement )
261     {
262         if ( childStatement == null )
263         {
264             logQuietly( "cannot schedule null" );
265         }
266         else if ( canSchedule() && strategy.canSchedule() )
267         {
268             try
269             {
270                 boolean isNotInterrupted = balancer.acquirePermit();
271                 if ( isNotInterrupted && !shutdown )
272                 {
273                     Runnable task = wrapTask( childStatement );
274                     strategy.schedule( task );
275                     started = true;
276                 }
277             }
278             catch ( RejectedExecutionException e )
279             {
280                 shutdown( false );
281             }
282             catch ( Throwable t )
283             {
284                 balancer.releasePermit();
285                 logQuietly( t );
286             }
287         }
288     }
289 
290     public void finished()
291     {
292         try
293         {
294             strategy.finished();
295         }
296         catch ( InterruptedException e )
297         {
298             logQuietly( e );
299         }
300         finally
301         {
302             for ( Controller slave : slaves )
303             {
304                 slave.awaitFinishedQuietly();
305             }
306         }
307     }
308 
309     private Runnable wrapTask( final Runnable task )
310     {
311         return new Runnable()
312         {
313             public void run()
314             {
315                 try
316                 {
317                     beforeExecute();
318                     task.run();
319                 }
320                 finally
321                 {
322                     try
323                     {
324                         afterExecute();
325                     }
326                     finally
327                     {
328                         balancer.releasePermit();
329                     }
330                 }
331             }
332         };
333     }
334 
335     protected ShutdownHandler newShutdownHandler()
336     {
337         return new ShutdownHandler();
338     }
339 
340     /**
341      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
342      */
343     private final class Controller
344     {
345         private final Scheduler slave;
346 
347         private Controller( Scheduler slave )
348         {
349             this.slave = slave;
350         }
351 
352         /**
353          * @return <tt>true</tt> if new children can be scheduled.
354          */
355         boolean canSchedule()
356         {
357             return Scheduler.this.canSchedule();
358         }
359 
360         void awaitFinishedQuietly()
361         {
362             try
363             {
364                 slave.finished();
365             }
366             catch ( Throwable t )
367             {
368                 slave.logQuietly( t );
369             }
370         }
371 
372         Collection<Description> shutdown( boolean shutdownNow )
373         {
374             return slave.shutdown( shutdownNow );
375         }
376 
377         @Override
378         public int hashCode()
379         {
380             return slave.hashCode();
381         }
382 
383         @Override
384         public boolean equals( Object o )
385         {
386             return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
387         }
388     }
389 
390     public class ShutdownHandler
391         implements RejectedExecutionHandler
392     {
393         private volatile RejectedExecutionHandler poolHandler;
394 
395         protected ShutdownHandler()
396         {
397             poolHandler = null;
398         }
399 
400         public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
401         {
402             this.poolHandler = poolHandler;
403         }
404 
405         public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
406         {
407             if ( executor.isShutdown() )
408             {
409                 shutdown( false );
410             }
411             final RejectedExecutionHandler poolHandler = this.poolHandler;
412             if ( poolHandler != null )
413             {
414                 poolHandler.rejectedExecution( r, executor );
415             }
416         }
417     }
418 }